1 /* 2 * Collie - An asynchronous event-driven network framework using Dlang development 3 * 4 * Copyright (C) 2015-2017 Shanghai Putao Technology Co., Ltd 5 * 6 * Developer: putao's Dlang team 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module collie.net.server.tcpserver; 12 13 import std.socket; 14 15 import kiss.exception; 16 import kiss.net.TcpListener; 17 import kiss.net.TcpStream; 18 import kiss.event.timer.common; 19 import kiss.util.timer; 20 import kiss.event; 21 import kiss.event.task; 22 23 import collie.net.server.connection; 24 import collie.net.server.exception; 25 26 final class TCPServer 27 { 28 alias NewConnection = ServerConnection delegate(TcpListener sender, TcpStream stream); 29 alias OnAceptorCreator = void delegate(kiss.net.TcpListener.TcpListener); 30 31 this(EventLoop loop) 32 { 33 _loop = loop; 34 } 35 36 @property tcpListener() 37 { 38 return _TcpListener; 39 } 40 41 @property eventLoop() 42 { 43 return _loop; 44 } 45 46 @property bindAddress() 47 { 48 return _bind; 49 } 50 51 @property timeout() 52 { 53 return _timeout; 54 } 55 56 void bind(Address addr, OnAceptorCreator ona = null) 57 { 58 if (_TcpListener !is null) 59 throw new SocketBindException("the server is areadly binded!"); 60 _bind = addr; 61 _TcpListener = new TcpListener(_loop, addr.addressFamily); 62 if (ona) 63 ona(_TcpListener); 64 _TcpListener.bind(_bind); 65 } 66 67 void listen(int block) 68 { 69 if (_TcpListener is null) 70 throw new SocketBindException("the server is not bind!"); 71 if (_cback is null) 72 throw new SocketServerException("Please set CallBack frist!"); 73 74 // _TcpListener.onPeerCreating(&createTcpStream); 75 _TcpListener.onConnectionAccepted(&newConnect); 76 _loop.postTask(newTask(() { _TcpListener.listen(block).start(); })); 77 } 78 79 void setNewConntionCallBack(NewConnection cback) 80 { 81 _cback = cback; 82 } 83 84 void startTimeout(uint s) 85 { 86 if (_wheel !is null) 87 throw new SocketServerException("TimeOut is runing!"); 88 _timeout = s; 89 if (_timeout == 0) 90 return; 91 92 uint whileSize; 93 uint time; 94 enum int[] fvka = [40, 120, 600, 1000, uint.max]; 95 enum int[] fvkb = [50, 60, 100, 150, 300]; 96 foreach (i; 0 .. fvka.length) 97 { 98 if (s <= fvka[i]) 99 { 100 whileSize = fvkb[i]; 101 time = _timeout * 1000 / whileSize; 102 break; 103 } 104 } 105 _wheel = new TimingWheel(whileSize); 106 _timer = new KissTimer(_loop, time); 107 _timer.onTick((Object) { _wheel.prevWheel(); }); 108 //_timer.start(time); 109 _loop.postTask(newTask(() { _timer.start(); })); 110 } 111 112 void close() 113 { 114 if (_TcpListener) 115 _loop.postTask(newTask(&_TcpListener.close)); 116 } 117 118 protected: 119 120 TcpStream createTcpStream(TcpListener sender, Socket sock, size_t bufferSize) 121 { 122 TcpStream tcpStream; 123 EventLoop loop = cast(EventLoop)sender.eventLoop; 124 version(USE_SSL){ 125 if(_ssl_Ctx){ 126 import collie.net.common; 127 auto ssl = SSL_new(_ssl_Ctx); 128 static if (IOMode == IO_MODE.iocp){ 129 BIO * readBIO = BIO_new(BIO_s_mem()); 130 BIO * writeBIO = BIO_new(BIO_s_mem()); 131 SSL_set_bio(ssl, readBIO, writeBIO); 132 SSL_set_accept_state(ssl); 133 tcpStream = new SSLSocket( loop, sock, ssl,readBIO,writeBIO); 134 } else { 135 if (SSL_set_fd(ssl, sock.handle()) < 0) 136 { 137 error("SSL_set_fd error: fd = ", sock.handle()); 138 SSL_shutdown(ssl); 139 SSL_free(ssl); 140 return null; 141 } 142 SSL_set_accept_state(ssl); 143 tcpStream = new SSLSocket( loop, sock, ssl); 144 } 145 } 146 } else { 147 tcpStream = new TcpStream(loop, sock, bufferSize); 148 } 149 150 return tcpStream; 151 } 152 153 void newConnect(TcpListener sender, TcpStream stream) 154 { 155 ServerConnection connection = _cback(sender, stream); 156 assert( connection !is null); 157 if (connection.active() && _wheel) 158 _wheel.addNewTimer(connection); 159 } 160 161 private: 162 kiss.net.TcpListener.TcpListener _TcpListener; 163 EventLoop _loop; 164 Address _bind; 165 private: 166 NewConnection _cback; 167 private: 168 TimingWheel _wheel; 169 KissTimer _timer; 170 uint _timeout; 171 }